[[micrometer]]
= Monitoring

[[monitoring-listener-performance]]
== Monitoring Listener Performance

Starting with version 2.3, the listener container will automatically create and update Micrometer `Timer`+++s+++ for the listener, if `Micrometer` is detected on the classpath, and a single `MeterRegistry` is present in the application context.
The timers can be disabled by setting the `ContainerProperty`+++'+++s `micrometerEnabled` to `false`.

Two timers are maintained - one for successful calls to the listener and one for failures.

The timers are named `spring.kafka.listener` and have the following tags:

* `name` : (container bean name)
* `result` : `success` or `failure`
* `exception` : `none` or `ListenerExecutionFailedException`

You can add additional tags using the `ContainerProperties`+++'+++s `micrometerTags` property.

Starting with versions 2.9.8, 3.0.6, you can provide a function in `ContainerProperties`+++'+++s `micrometerTagsProvider`; the function receives the `ConsumerRecord<?, ?>` and returns tags which can be based on that record, and merged with any static tags in `micrometerTags`.

NOTE: With the concurrent container, timers are created for each thread and the `name` tag is suffixed with `-n` where n is `0` to `concurrency-1`.

[[monitoring-kafkatemplate-performance]]
== Monitoring KafkaTemplate Performance

Starting with version 2.5, the template will automatically create and update Micrometer `Timer`+++s+++ for send operations, if `Micrometer` is detected on the classpath, and a single `MeterRegistry` is present in the application context.
The timers can be disabled by setting the template's `micrometerEnabled` property to `false`.

Two timers are maintained - one for successful calls to the listener and one for failures.

The timers are named `spring.kafka.template` and have the following tags:

* `name` : (template bean name)
* `result` : `success` or `failure`
* `exception` : `none` or the exception class name for failures

You can add additional tags using the template's `micrometerTags` property.

Starting with versions 2.9.8, 3.0.6, you can provide a `KafkaTemplate.setMicrometerTagsProvider(Function<ProducerRecord<?, ?>, Map<String, String>>)` property; the function receives the `ProducerRecord<?, ?>` and returns tags which can be based on that record, and merged with any static tags in `micrometerTags`.

[[micrometer-native]]
== Micrometer Native Metrics

Starting with version 2.5, the framework provides xref:kafka/connecting.adoc#factory-listeners[Factory Listeners] to manage a Micrometer `KafkaClientMetrics` instance whenever producers and consumers are created and closed.

To enable this feature, simply add the listeners to your producer and consumer factories:

[source, java]
----
@Bean
public ConsumerFactory<String, String> myConsumerFactory() {
    Map<String, Object> configs = consumerConfigs();
    ...
    DefaultKafkaConsumerFactory<String, String> cf = new DefaultKafkaConsumerFactory<>(configs);
    ...
    cf.addListener(new MicrometerConsumerListener<String, String>(meterRegistry(),
            Collections.singletonList(new ImmutableTag("customTag", "customTagValue"))));
    ...
    return cf;
}

@Bean
public ProducerFactory<String, String> myProducerFactory() {
    Map<String, Object> configs = producerConfigs();
    configs.put(ProducerConfig.CLIENT_ID_CONFIG, "myClientId");
    ...
    DefaultKafkaProducerFactory<String, String> pf = new DefaultKafkaProducerFactory<>(configs);
    ...
    pf.addListener(new MicrometerProducerListener<String, String>(meterRegistry(),
            Collections.singletonList(new ImmutableTag("customTag", "customTagValue"))));
    ...
    return pf;
}
----

The consumer/producer `id` passed to the listener is added to the meter's tags with tag name `spring.id`.

.An example of obtaining one of the Kafka metrics
[source, java]
----
double count = this.meterRegistry.get("kafka.producer.node.incoming.byte.total")
                .tag("customTag", "customTagValue")
                .tag("spring.id", "myProducerFactory.myClientId-1")
                .functionCounter()
                .count();
----

A similar listener is provided for the `StreamsBuilderFactoryBean` - see xref:streams.adoc#streams-micrometer[KafkaStreams Micrometer Support].

[[observation]]
== Micrometer Observation

Using Micrometer for observation is now supported, since version 3.0, for the `KafkaTemplate` and listener containers.

Set `observationEnabled` to `true` on the `KafkaTemplate` and `ContainerProperties` to enable observation; this will disable xref:kafka/micrometer.adoc[Micrometer Timers] because the timers will now be managed with each observation.

IMPORTANT: Micrometer Observation does not support batch listener; this will enable Micrometer Timers

Refer to https://micrometer.io/docs/tracing[Micrometer Tracing] for more information.

To add tags to timers/traces, configure a custom `KafkaTemplateObservationConvention` or `KafkaListenerObservationConvention` to the template or listener container, respectively.

The default implementations add the `bean.name` tag for template observations and `listener.id` tag for containers.

You can either subclass `DefaultKafkaTemplateObservationConvention` or `DefaultKafkaListenerObservationConvention` or provide completely new implementations.

See xref:appendix/micrometer.adoc#observation-gen[Micrometer Observation Documentation] for details of the default observations that are recorded.

Starting with version 3.0.6, you can add dynamic tags to the timers and traces, based on information in the consumer or producer records.
To do so, add a custom `KafkaListenerObservationConvention` and/or `KafkaTemplateObservationConvention` to the listener container properties or `KafkaTemplate` respectively.
The `record` property in both observation contexts contains the `ConsumerRecord` or `ProducerRecord` respectively.

The sender and receiver contexts `remoteServiceName` properties are set to the Kafka `clusterId` property; this is retrieved by a `KafkaAdmin`.
If, for some reason - perhaps lack of admin permissions, you cannot retrieve the cluster id, starting with version 3.1, you can set a manual `clusterId` on the `KafkaAdmin` and inject it into `KafkaTemplate` s and listener containers.
When it is `null` (default), the admin will invoke the `describeCluster` admin operation to retrieve it from the broker.
